/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.mapreduce;

import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.net.URI;
import java.net.URL;
import java.net.URLConnection;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configuration.IntegerRanges;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.mapreduce.filecache.DistributedCache;
import org.apache.hadoop.mapreduce.task.JobContextImpl;
import org.apache.hadoop.mapreduce.util.ConfigUtil;
import org.apache.hadoop.util.StringUtils;

/**
 * The job submitter's view of the Job.
 * 
 * <p>
 * It allows the user to configure the job, submit it, control its execution,
 * and query the state. The set methods only work until the job is submitted,
 * afterwards they will throw an IllegalStateException.
 * </p>
 * 
 * <p>
 * Normally the user creates the application, describes various facets of the
 * job via {@link Job} and then submits the job and monitor its progress.
 * </p>
 * 
 * <p>
 * Here is an example on how to submit a job:
 * </p>
 * <p>
 * <blockquote>
 * 
 * <pre>
 * // Create a new Job
 * Job job = new Job(new Configuration());
 * job.setJarByClass(MyJob.class);
 * 
 * // Specify various job-specific parameters
 * job.setJobName(&quot;myjob&quot;);
 * 
 * job.setInputPath(new Path(&quot;in&quot;));
 * job.setOutputPath(new Path(&quot;out&quot;));
 * 
 * job.setMapperClass(MyJob.MyMapper.class);
 * job.setReducerClass(MyJob.MyReducer.class);
 * 
 * // Submit the job, then poll for progress until the job is complete
 * job.waitForCompletion(true);
 * </pre>
 * 
 * </blockquote>
 * </p>
 * 
 * 
 */
@InterfaceAudience.Public
@InterfaceStability.Evolving
@SuppressWarnings({ "deprecation", "rawtypes" })
public class Job extends JobContextImpl implements JobContext {
	@InterfaceStability.Evolving
	public static enum JobState {
		DEFINE, RUNNING
	}

	@InterfaceStability.Evolving
	public static enum TaskStatusFilter {
		NONE, KILLED, FAILED, SUCCEEDED, ALL
	};

	private static final Log LOG = LogFactory.getLog(Job.class);
	private static final long MAX_JOBSTATUS_AGE = 1000 * 2;
	public static final String OUTPUT_FILTER = "mapreduce.client.output.filter";

	/** Key in mapred-*.xml that sets completionPollInvervalMillis */
	public static final String COMPLETION_POLL_INTERVAL_KEY = "mapreduce.client.completion.pollinterval";
	/** Default completionPollIntervalMillis is 5000 ms. */
	static final int DEFAULT_COMPLETION_POLL_INTERVAL = 5000;
	/** Key in mapred-*.xml that sets progMonitorPollIntervalMillis */
	public static final String PROGRESS_MONITOR_POLL_INTERVAL_KEY = "mapreduce.client.progressmonitor.pollinterval";

	/** Default progMonitorPollIntervalMillis is 1000 ms. */
	static final int DEFAULT_MONITOR_POLL_INTERVAL = 1000;
	public static final String USED_GENERIC_PARSER = "mapreduce.client.genericoptionsparser.used";

	public static final String SUBMIT_REPLICATION = "mapreduce.client.submit.file.replication";

	static {
		ConfigUtil.loadResources();
	}

	/** The interval at which waitForCompletion() should check. */
	public static int getCompletionPollInterval(Configuration conf) {
		int completionPollIntervalMillis = conf.getInt(
				COMPLETION_POLL_INTERVAL_KEY, DEFAULT_COMPLETION_POLL_INTERVAL);
		if (completionPollIntervalMillis < 1) {
			LOG.warn(COMPLETION_POLL_INTERVAL_KEY
					+ " has been set to an invalid value; " + "replacing with "
					+ DEFAULT_COMPLETION_POLL_INTERVAL);
			completionPollIntervalMillis = DEFAULT_COMPLETION_POLL_INTERVAL;
		}
		return completionPollIntervalMillis;
	}

	public static Job getInstance(Cluster cluster) throws IOException {
		return new Job(cluster);
	}

	public static Job getInstance(Cluster cluster, Configuration conf)
			throws IOException {
		return new Job(cluster, conf);
	}

	public static Job getInstance(Cluster cluster, JobStatus status,
			Configuration conf) throws IOException {
		return new Job(cluster, status, conf);
	}

	/** The interval at which monitorAndPrintJob() prints status */
	public static int getProgressPollInterval(Configuration conf) {
		// Read progress monitor poll interval from config. Default is 1 second.
		int progMonitorPollIntervalMillis = conf.getInt(
				PROGRESS_MONITOR_POLL_INTERVAL_KEY,
				DEFAULT_MONITOR_POLL_INTERVAL);
		if (progMonitorPollIntervalMillis < 1) {
			LOG.warn(PROGRESS_MONITOR_POLL_INTERVAL_KEY
					+ " has been set to an invalid value; "
					+ " replacing with " + DEFAULT_MONITOR_POLL_INTERVAL);
			progMonitorPollIntervalMillis = DEFAULT_MONITOR_POLL_INTERVAL;
		}
		return progMonitorPollIntervalMillis;
	}

	/**
	 * Get the task output filter.
	 * 
	 * @param conf
	 *            the configuration.
	 * @return the filter level.
	 */
	public static TaskStatusFilter getTaskOutputFilter(Configuration conf) {
		return TaskStatusFilter.valueOf(conf.get(Job.OUTPUT_FILTER, "FAILED"));
	}

	/**
	 * Modify the Configuration to set the task output filter.
	 * 
	 * @param conf
	 *            the Configuration to modify.
	 * @param newValue
	 *            the value to set.
	 */
	public static void setTaskOutputFilter(Configuration conf,
			TaskStatusFilter newValue) {
		conf.set(Job.OUTPUT_FILTER, newValue.toString());
	}

	protected JobState state = JobState.DEFINE;

	protected JobStatus status;

	private long statustime;

	protected Cluster cluster;

	@Deprecated
	public Job() throws IOException {
		this(new Configuration());
	}

	Job(Cluster cluster) throws IOException {
		this(cluster, new Configuration());
	}

	Job(Cluster cluster, Configuration conf) throws IOException {
		super(conf, null);
		this.cluster = cluster;
	}

	Job(Cluster cluster, JobStatus status, Configuration conf)
			throws IOException {
		this(cluster, conf);
		setJobID(status.getJobID());
		this.status = status;
		state = JobState.RUNNING;
	}

	@Deprecated
	public Job(Configuration conf) throws IOException {
		this(new Cluster(conf), conf);
	}

	@Deprecated
	public Job(Configuration conf, String jobName) throws IOException {
		this(conf);
		setJobName(jobName);
	}

	/**
	 * Add an archive path to the current set of classpath entries. It adds the
	 * archive to cache as well.
	 * 
	 * @param archive
	 *            Path of the archive to be added
	 */
	public void addArchiveToClassPath(Path archive) throws IOException {
		ensureState(JobState.DEFINE);
		DistributedCache.addArchiveToClassPath(archive, conf);
	}

	/**
	 * Add a archives to be localized
	 * 
	 * @param uri
	 *            The uri of the cache to be localized
	 */
	public void addCacheArchive(URI uri) {
		ensureState(JobState.DEFINE);
		DistributedCache.addCacheArchive(uri, conf);
	}

	/**
	 * Add a file to be localized
	 * 
	 * @param uri
	 *            The uri of the cache to be localized
	 */
	public void addCacheFile(URI uri) {
		ensureState(JobState.DEFINE);
		DistributedCache.addCacheFile(uri, conf);
	}

	/**
	 * Add an file path to the current set of classpath entries It adds the file
	 * to cache as well.
	 * 
	 * @param file
	 *            Path of the file to be added
	 */
	public void addFileToClassPath(Path file) throws IOException {
		ensureState(JobState.DEFINE);
		DistributedCache.addFileToClassPath(file, conf);
	}

	/**
	 * Get the <i>progress</i> of the job's cleanup-tasks, as a float between
	 * 0.0 and 1.0. When all cleanup tasks have completed, the function returns
	 * 1.0.
	 * 
	 * @return the progress of the job's cleanup-tasks.
	 * @throws IOException
	 */
	public float cleanupProgress() throws IOException, InterruptedException {
		ensureState(JobState.RUNNING);
		ensureFreshStatus();
		return status.getCleanupProgress();
	}

	/**
	 * This method allows you to create symlinks in the current working
	 * directory of the task to all the cache files/archives
	 */
	public void createSymlink() {
		ensureState(JobState.DEFINE);
		DistributedCache.createSymlink(conf);
	}

	private void displayTaskLogs(TaskAttemptID taskId, String baseUrl)
			throws IOException {
		// The tasktracker for a 'failed/killed' job might not be around...
		if (baseUrl != null) {
			// Construct the url for the tasklogs
			String taskLogUrl = getTaskLogURL(taskId, baseUrl);

			// Copy tasks's stdout of the JobClient
			getTaskLogs(taskId, new URL(taskLogUrl + "&filter=stdout"),
					System.out);

			// Copy task's stderr to stderr of the JobClient
			getTaskLogs(taskId, new URL(taskLogUrl + "&filter=stderr"),
					System.err);
		}
	}

	private void downloadProfile(TaskCompletionEvent e) throws IOException {
		URLConnection connection = new URL(getTaskLogURL(e.getTaskAttemptId(),
				e.getTaskTrackerHttp()) + "&filter=profile").openConnection();
		InputStream in = connection.getInputStream();
		OutputStream out = new FileOutputStream(e.getTaskAttemptId()
				+ ".profile");
		IOUtils.copyBytes(in, out, 64 * 1024, true);
	}

	/**
	 * Some methods rely on having a recent job status object. Refresh it, if
	 * necessary
	 */
	synchronized void ensureFreshStatus() throws IOException,
			InterruptedException {
		if (System.currentTimeMillis() - statustime > MAX_JOBSTATUS_AGE) {
			updateStatus();
		}
	}

	private void ensureNotSet(String attr, String msg) throws IOException {
		if (conf.get(attr) != null) {
			throw new IOException(attr + " is incompatible with " + msg
					+ " mode.");
		}
	}

	protected void ensureState(JobState state) throws IllegalStateException {
		if (state != this.state) {
			throw new IllegalStateException("Job in state " + this.state
					+ " instead of " + state);
		}
	}

	/**
	 * Fail indicated task attempt.
	 * 
	 * @param taskId
	 *            the id of the task to be terminated.
	 * @throws IOException
	 */
	public boolean failTask(TaskAttemptID taskId) throws IOException,
			InterruptedException {
		ensureState(JobState.RUNNING);
		return cluster.getClient().killTask(taskId, true);
	}

	/**
	 * Gets the counters for this job. May return null if the job has been
	 * retired and the job is no longer in the completed job store.
	 * 
	 * @return the counters for this job.
	 * @throws IOException
	 */
	public Counters getCounters() throws IOException, InterruptedException {
		ensureState(JobState.RUNNING);
		return cluster.getClient().getJobCounters(getJobID());
	}

	/**
	 * Get finish time of the job.
	 * 
	 * @return the finish time of the job
	 */
	public long getFinishTime() throws IOException, InterruptedException {
		ensureState(JobState.RUNNING);
		updateStatus();
		return status.getFinishTime();
	}

	public String getHistoryUrl() throws IOException, InterruptedException {
		ensureState(JobState.RUNNING);
		updateStatus();
		return status.getHistoryFile();
	}

	/**
	 * Get the path of the submitted job configuration.
	 * 
	 * @return the path of the submitted job configuration.
	 */
	public String getJobFile() {
		ensureState(JobState.RUNNING);
		return status.getJobFile();
	}

	/**
	 * The user-specified job name.
	 */
	public String getJobName() {
		if (state == JobState.DEFINE) {
			return super.getJobName();
		}
		ensureState(JobState.RUNNING);
		return status.getJobName();
	}

	/**
	 * Returns the current state of the Job.
	 * 
	 * @return JobStatus#State
	 * @throws IOException
	 * @throws InterruptedException
	 */
	public JobStatus.State getJobState() throws IOException,
			InterruptedException {
		ensureState(JobState.RUNNING);
		updateStatus();
		return status.getState();
	}

	/**
	 * Get scheduling info of the job.
	 * 
	 * @return the scheduling info of the job
	 */
	public JobPriority getPriority() throws IOException, InterruptedException {
		ensureState(JobState.RUNNING);
		updateStatus();
		return status.getPriority();
	}

	/**
	 * Get scheduling info of the job.
	 * 
	 * @return the scheduling info of the job
	 */
	public String getSchedulingInfo() {
		ensureState(JobState.RUNNING);
		return status.getSchedulingInfo();
	}

	/**
	 * Get start time of the job.
	 * 
	 * @return the start time of the job
	 */
	public long getStartTime() {
		ensureState(JobState.RUNNING);
		return status.getStartTime();
	}

	public JobStatus getStatus() throws IOException, InterruptedException {
		ensureState(JobState.RUNNING);
		updateStatus();
		return status;
	}

	/**
	 * Get events indicating completion (success/failure) of component tasks.
	 * 
	 * @param startFrom
	 *            index to start fetching events from
	 * @param numEvents
	 *            number of events to fetch
	 * @return an array of {@link TaskCompletionEvent}s
	 * @throws IOException
	 */
	public TaskCompletionEvent[] getTaskCompletionEvents(int startFrom,
			int numEvents) throws IOException, InterruptedException {
		ensureState(JobState.RUNNING);
		return cluster.getClient().getTaskCompletionEvents(getJobID(),
				startFrom, numEvents);
	}

	/**
	 * Gets the diagnostic messages for a given task attempt.
	 * 
	 * @param taskid
	 * @return the list of diagnostic messages for the task
	 * @throws IOException
	 */
	public String[] getTaskDiagnostics(TaskAttemptID taskid)
			throws IOException, InterruptedException {
		ensureState(JobState.RUNNING);
		return cluster.getClient().getTaskDiagnostics(taskid);
	}

	private void getTaskLogs(TaskAttemptID taskId, URL taskLogUrl,
			OutputStream out) {
		try {
			URLConnection connection = taskLogUrl.openConnection();
			BufferedReader input = new BufferedReader(new InputStreamReader(
					connection.getInputStream()));
			BufferedWriter output = new BufferedWriter(new OutputStreamWriter(
					out));
			try {
				String logData = null;
				while ((logData = input.readLine()) != null) {
					if (logData.length() > 0) {
						output.write(taskId + ": " + logData + "\n");
						output.flush();
					}
				}
			} finally {
				input.close();
			}
		} catch (IOException ioe) {
			LOG.warn("Error reading task output" + ioe.getMessage());
		}
	}

	private String getTaskLogURL(TaskAttemptID taskId, String baseUrl) {
		return (baseUrl + "/tasklog?plaintext=true&attemptid=" + taskId);
	}

	/**
	 * Get the information of the current state of the tasks of a job.
	 * 
	 * @param type
	 *            Type of the task
	 * @return the list of all of the map tips.
	 * @throws IOException
	 */
	public TaskReport[] getTaskReports(TaskType type) throws IOException,
			InterruptedException {
		ensureState(JobState.RUNNING);
		return cluster.getClient().getTaskReports(getJobID(), type);
	}

	/**
	 * Get the URL where some job progress information will be displayed.
	 * 
	 * @return the URL where some job progress information will be displayed.
	 */
	public String getTrackingURL() {
		ensureState(JobState.RUNNING);
		return status.getTrackingUrl().toString();
	}

	public void increaseIteration(){
		ensureState(JobState.RUNNING);
		long iteration = conf.getLong(JOB_ITERATION, 0);
		++iteration;
		conf.setLong(JOB_ITERATION, iteration);
	}

	/**
	 * Check if the job is finished or not. This is a non-blocking call.
	 * 
	 * @return <code>true</code> if the job is complete, else <code>false</code>
	 *         .
	 * @throws IOException
	 */
	public boolean isComplete() throws IOException, InterruptedException {
		ensureState(JobState.RUNNING);
		updateStatus();
		return status.isJobComplete();
	}

	public boolean isRetired() throws IOException, InterruptedException {
		ensureState(JobState.RUNNING);
		updateStatus();
		return status.isRetired();
	}

	/**
	 * Check if the job completed successfully.
	 * 
	 * @return <code>true</code> if the job succeeded, else <code>false</code>.
	 * @throws IOException
	 */
	public boolean isSuccessful() throws IOException, InterruptedException {
		ensureState(JobState.RUNNING);
		updateStatus();
		return status.getState() == JobStatus.State.SUCCEEDED;
	}

	/**
	 * Kill the running job. Blocks until all job tasks have been killed as
	 * well. If the job is no longer running, it simply returns.
	 * 
	 * @throws IOException
	 */
	public void killJob() throws IOException, InterruptedException {
		ensureState(JobState.RUNNING);
		cluster.getClient().killJob(getJobID());
	}

	/**
	 * Kill indicated task attempt.
	 * 
	 * @param taskId
	 *            the id of the task to be terminated.
	 * @throws IOException
	 */
	public boolean killTask(TaskAttemptID taskId) throws IOException,
			InterruptedException {
		ensureState(JobState.RUNNING);
		return cluster.getClient().killTask(taskId, false);
	}

	/**
	 * Get the <i>progress</i> of the job's map-tasks, as a float between 0.0
	 * and 1.0. When all map tasks have completed, the function returns 1.0.
	 * 
	 * @return the progress of the job's map-tasks.
	 * @throws IOException
	 */
	public float mapProgress() throws IOException, InterruptedException {
		ensureState(JobState.RUNNING);
		ensureFreshStatus();
		return status.getMapProgress();
	}

	/**
	 * Monitor a job and print status in real-time as progress is made and tasks
	 * fail.
	 * 
	 * @return true if the job succeeded
	 * @throws IOException
	 *             if communication to the JobTracker fails
	 */
	public boolean monitorAndPrintJob() throws IOException,
			InterruptedException {
		String lastReport = null;
		Job.TaskStatusFilter filter;
		Configuration clientConf = cluster.getConf();
		filter = Job.getTaskOutputFilter(clientConf);
		JobID jobId = getJobID();
		LOG.info("Running job: " + jobId);
		int eventCounter = 0;
		boolean profiling = getProfileEnabled();
		IntegerRanges mapRanges = getProfileTaskRange(true);
		IntegerRanges reduceRanges = getProfileTaskRange(false);
		int progMonitorPollIntervalMillis = Job
				.getProgressPollInterval(clientConf);
		while (!isComplete()) {
			Thread.sleep(progMonitorPollIntervalMillis);
			String report = (" map "
					+ StringUtils.formatPercent(mapProgress(), 0) + " reduce " + StringUtils
					.formatPercent(reduceProgress(), 0));
			if (!report.equals(lastReport)) {
				LOG.info(report);
				lastReport = report;
			}

			TaskCompletionEvent[] events = getTaskCompletionEvents(
					eventCounter, 10);
			eventCounter += events.length;
			printTaskEvents(events, filter, profiling, mapRanges, reduceRanges);
		}
		LOG.info("Job complete: " + jobId);
		Counters counters = getCounters();
		if (counters != null) {
			LOG.info(counters.toString());
		}
		return isSuccessful();
	}

	private void printTaskEvents(TaskCompletionEvent[] events,
			Job.TaskStatusFilter filter, boolean profiling,
			IntegerRanges mapRanges, IntegerRanges reduceRanges)
			throws IOException, InterruptedException {
		for (TaskCompletionEvent event : events) {
			TaskCompletionEvent.Status status = event.getStatus();
			if (profiling
					&& shouldDownloadProfile()
					&& (status == TaskCompletionEvent.Status.SUCCEEDED || status == TaskCompletionEvent.Status.FAILED)
					&& (event.isMapTask() ? mapRanges : reduceRanges)
							.isIncluded(event.idWithinJob())) {
				downloadProfile(event);
			}
			switch (filter) {
			case NONE:
				break;
			case SUCCEEDED:
				if (event.getStatus() == TaskCompletionEvent.Status.SUCCEEDED) {
					LOG.info(event.toString());
					displayTaskLogs(event.getTaskAttemptId(),
							event.getTaskTrackerHttp());
				}
				break;
			case FAILED:
				if (event.getStatus() == TaskCompletionEvent.Status.FAILED) {
					LOG.info(event.toString());
					// Displaying the task diagnostic information
					TaskAttemptID taskId = event.getTaskAttemptId();
					String[] taskDiagnostics = getTaskDiagnostics(taskId);
					if (taskDiagnostics != null) {
						for (String diagnostics : taskDiagnostics) {
							System.err.println(diagnostics);
						}
					}
					// Displaying the task logs
					displayTaskLogs(event.getTaskAttemptId(),
							event.getTaskTrackerHttp());
				}
				break;
			case KILLED:
				if (event.getStatus() == TaskCompletionEvent.Status.KILLED) {
					LOG.info(event.toString());
				}
				break;
			case ALL:
				LOG.info(event.toString());
				displayTaskLogs(event.getTaskAttemptId(),
						event.getTaskTrackerHttp());
				break;
			}
		}
	}

	/**
	 * Get the <i>progress</i> of the job's reduce-tasks, as a float between 0.0
	 * and 1.0. When all reduce tasks have completed, the function returns 1.0.
	 * 
	 * @return the progress of the job's reduce-tasks.
	 * @throws IOException
	 */
	public float reduceProgress() throws IOException, InterruptedException {
		ensureState(JobState.RUNNING);
		ensureFreshStatus();
		return status.getReduceProgress();
	}

	/**
	 * Set the given set of archives
	 * 
	 * @param archives
	 *            The list of archives that need to be localized
	 */
	public void setCacheArchives(URI[] archives) {
		ensureState(JobState.DEFINE);
		DistributedCache.setCacheArchives(archives, conf);
	}

	/**
	 * Set the given set of files
	 * 
	 * @param files
	 *            The list of files that need to be localized
	 */
	public void setCacheFiles(URI[] files) {
		ensureState(JobState.DEFINE);
		DistributedCache.setCacheFiles(files, conf);
	}

	/**
	 * Sets the flag that will allow the JobTracker to cancel the HDFS
	 * delegation tokens upon job completion. Defaults to true.
	 */
	public void setCancelDelegationTokenUponJobCompletion(boolean value) {
		ensureState(JobState.DEFINE);
		conf.setBoolean(JOB_CANCEL_DELEGATION_TOKEN, value);
	}

	/**
	 * Set the combiner class for the job.
	 * 
	 * @param cls
	 *            the combiner to use
	 * @throws IllegalStateException
	 *             if the job is submitted
	 */
	public void setCombinerClass(Class<? extends Reducer> cls)
			throws IllegalStateException {
		ensureState(JobState.DEFINE);
		conf.setClass(COMBINE_CLASS_ATTR, cls, Reducer.class);
	}

	/**
	 * Define the comparator that controls which keys are grouped together for a
	 * single call to
	 * {@link Reducer#reduce(Object, Iterable, org.apache.hadoop.mapreduce.Reducer.Context)}
	 * 
	 * @param cls
	 *            the raw comparator to use
	 * @throws IllegalStateException
	 *             if the job is submitted
	 */
	public void setGroupingComparatorClass(Class<? extends RawComparator> cls)
			throws IllegalStateException {
		ensureState(JobState.DEFINE);
		conf.setOutputValueGroupingComparator(cls);
	}

	/**
	 * Set the {@link InputFormat} for the job.
	 * 
	 * @param cls
	 *            the <code>InputFormat</code> to use
	 * @throws IllegalStateException
	 *             if the job is submitted
	 */
	public void setInputFormatClass(Class<? extends InputFormat> cls)
			throws IllegalStateException {
		ensureState(JobState.DEFINE);
		conf.setClass(INPUT_FORMAT_CLASS_ATTR, cls, InputFormat.class);
	}

	public void setIterative(boolean iterative) {
		ensureState(JobState.DEFINE);
		conf.setBoolean(JOB_ITERATIVE, iterative);
	}

	/**
	 * Set the job jar
	 */
	public void setJar(String jar) {
		ensureState(JobState.DEFINE);
		conf.setJar(jar);
	}

	/**
	 * Set the Jar by finding where a given class came from.
	 * 
	 * @param cls
	 *            the example class
	 */
	public void setJarByClass(Class<?> cls) {
		ensureState(JobState.DEFINE);
		conf.setJarByClass(cls);
	}

	/**
	 * Set the user-specified job name.
	 * 
	 * @param name
	 *            the job's new name.
	 * @throws IllegalStateException
	 *             if the job is submitted
	 */
	public void setJobName(String name) throws IllegalStateException {
		ensureState(JobState.DEFINE);
		conf.setJobName(name);
	}

	/**
	 * Specify whether job-setup and job-cleanup is needed for the job
	 * 
	 * @param needed
	 *            If <code>true</code>, job-setup and job-cleanup will be
	 *            considered from {@link OutputCommitter} else ignored.
	 */
	public void setJobSetupCleanupNeeded(boolean needed) {
		ensureState(JobState.DEFINE);
		conf.setBoolean(SETUP_CLEANUP_NEEDED, needed);
	}
	
	/**
	 * Set the key class for the map output data. This allows the user to
	 * specify the map output key class to be different than the final output
	 * value class.
	 * 
	 * @param theClass
	 *            the map output key class.
	 * @throws IllegalStateException
	 *             if the job is submitted
	 */
	public void setMapOutputKeyClass(Class<?> theClass)
			throws IllegalStateException {
		ensureState(JobState.DEFINE);
		conf.setMapOutputKeyClass(theClass);
	}

	/**
	 * Set the value class for the map output data. This allows the user to
	 * specify the map output value class to be different than the final output
	 * value class.
	 * 
	 * @param theClass
	 *            the map output value class.
	 * @throws IllegalStateException
	 *             if the job is submitted
	 */
	public void setMapOutputValueClass(Class<?> theClass)
			throws IllegalStateException {
		ensureState(JobState.DEFINE);
		conf.setMapOutputValueClass(theClass);
	}

	/**
	 * Set the {@link Mapper} for the job.
	 * 
	 * @param cls
	 *            the <code>Mapper</code> to use
	 * @throws IllegalStateException
	 *             if the job is submitted
	 */
	public void setMapperClass(Class<? extends Mapper> cls)
			throws IllegalStateException {
		ensureState(JobState.DEFINE);
		conf.setClass(MAP_CLASS_ATTR, cls, Mapper.class);
	}

	/**
	 * Turn speculative execution on or off for this job for map tasks.
	 * 
	 * @param speculativeExecution
	 *            <code>true</code> if speculative execution should be turned on
	 *            for map tasks, else <code>false</code>.
	 */
	public void setMapSpeculativeExecution(boolean speculativeExecution) {
		ensureState(JobState.DEFINE);
		conf.setMapSpeculativeExecution(speculativeExecution);
	}

	/**
	 * Expert: Set the number of maximum attempts that will be made to run a map
	 * task.
	 * 
	 * @param n
	 *            the number of attempts per map task.
	 */
	public void setMaxMapAttempts(int n) {
		ensureState(JobState.DEFINE);
		conf.setMaxMapAttempts(n);
	}

	/**
	 * Expert: Set the number of maximum attempts that will be made to run a
	 * reduce task.
	 * 
	 * @param n
	 *            the number of attempts per reduce task.
	 */
	public void setMaxReduceAttempts(int n) {
		ensureState(JobState.DEFINE);
		conf.setMaxReduceAttempts(n);
	}

	/**
	 * Set the number of reduce tasks for the job.
	 * 
	 * @param tasks
	 *            the number of reduce tasks
	 * @throws IllegalStateException
	 *             if the job is submitted
	 */
	public void setNumReduceTasks(int tasks) throws IllegalStateException {
		ensureState(JobState.DEFINE);
		conf.setNumReduceTasks(tasks);
	}

	/**
	 * Set the {@link OutputFormat} for the job.
	 * 
	 * @param cls
	 *            the <code>OutputFormat</code> to use
	 * @throws IllegalStateException
	 *             if the job is submitted
	 */
	public void setOutputFormatClass(Class<? extends OutputFormat> cls)
			throws IllegalStateException {
		ensureState(JobState.DEFINE);
		conf.setClass(OUTPUT_FORMAT_CLASS_ATTR, cls, OutputFormat.class);
	}

	/**
	 * Set the key class for the job output data.
	 * 
	 * @param theClass
	 *            the key class for the job output data.
	 * @throws IllegalStateException
	 *             if the job is submitted
	 */
	public void setOutputKeyClass(Class<?> theClass)
			throws IllegalStateException {
		ensureState(JobState.DEFINE);
		conf.setOutputKeyClass(theClass);
	}

	/**
	 * Set the value class for job outputs.
	 * 
	 * @param theClass
	 *            the value class for job outputs.
	 * @throws IllegalStateException
	 *             if the job is submitted
	 */
	public void setOutputValueClass(Class<?> theClass)
			throws IllegalStateException {
		ensureState(JobState.DEFINE);
		conf.setOutputValueClass(theClass);
	}

	/**
	 * Set the {@link Partitioner} for the job.
	 * 
	 * @param cls
	 *            the <code>Partitioner</code> to use
	 * @throws IllegalStateException
	 *             if the job is submitted
	 */
	public void setPartitionerClass(Class<? extends Partitioner> cls)
			throws IllegalStateException {
		ensureState(JobState.DEFINE);
		conf.setClass(PARTITIONER_CLASS_ATTR, cls, Partitioner.class);
	}

	/**
	 * Set the priority of a running job.
	 * 
	 * @param priority
	 *            the new priority for the job.
	 * @throws IOException
	 */
	public void setPriority(JobPriority priority) throws IOException,
			InterruptedException {
		if (state == JobState.DEFINE) {
			conf.setJobPriority(org.apache.hadoop.mapred.JobPriority
					.valueOf(priority.name()));
		} else {
			ensureState(JobState.RUNNING);
			cluster.getClient().setJobPriority(getJobID(), priority.toString());
		}
	}

	/**
	 * Set whether the system should collect profiler information for some of
	 * the tasks in this job? The information is stored in the user log
	 * directory.
	 * 
	 * @param newValue
	 *            true means it should be gathered
	 */
	public void setProfileEnabled(boolean newValue) {
		ensureState(JobState.DEFINE);
		conf.setProfileEnabled(newValue);
	}

	/**
	 * Set the profiler configuration arguments. If the string contains a '%s'
	 * it will be replaced with the name of the profiling output file when the
	 * task runs.
	 * 
	 * This value is passed to the task child JVM on the command line.
	 * 
	 * @param value
	 *            the configuration string
	 */
	public void setProfileParams(String value) {
		ensureState(JobState.DEFINE);
		conf.setProfileParams(value);
	}

	/**
	 * Set the ranges of maps or reduces to profile. setProfileEnabled(true)
	 * must also be called.
	 * 
	 * @param newValue
	 *            a set of integer ranges of the map ids
	 */
	public void setProfileTaskRange(boolean isMap, String newValue) {
		ensureState(JobState.DEFINE);
		conf.setProfileTaskRange(isMap, newValue);
	}

	/**
	 * Set the {@link Reducer} for the job.
	 * 
	 * @param cls
	 *            the <code>Reducer</code> to use
	 * @throws IllegalStateException
	 *             if the job is submitted
	 */
	public void setReducerClass(Class<? extends Reducer> cls)
			throws IllegalStateException {
		ensureState(JobState.DEFINE);
		conf.setClass(REDUCE_CLASS_ATTR, cls, Reducer.class);
	}

	/**
	 * Turn speculative execution on or off for this job for reduce tasks.
	 * 
	 * @param speculativeExecution
	 *            <code>true</code> if speculative execution should be turned on
	 *            for reduce tasks, else <code>false</code>.
	 */
	public void setReduceSpeculativeExecution(boolean speculativeExecution) {
		ensureState(JobState.DEFINE);
		conf.setReduceSpeculativeExecution(speculativeExecution);
	}

	/**
	 * Define the comparator that controls how the keys are sorted before they
	 * are passed to the {@link Reducer}.
	 * 
	 * @param cls
	 *            the raw comparator
	 * @throws IllegalStateException
	 *             if the job is submitted
	 */
	public void setSortComparatorClass(Class<? extends RawComparator> cls)
			throws IllegalStateException {
		ensureState(JobState.DEFINE);
		conf.setOutputKeyComparatorClass(cls);
	}

	/**
	 * Turn speculative execution on or off for this job.
	 * 
	 * @param speculativeExecution
	 *            <code>true</code> if speculative execution should be turned
	 *            on, else <code>false</code>.
	 */
	public void setSpeculativeExecution(boolean speculativeExecution) {
		ensureState(JobState.DEFINE);
		conf.setSpeculativeExecution(speculativeExecution);
	}

	/**
	 * Get the <i>progress</i> of the job's setup-tasks, as a float between 0.0
	 * and 1.0. When all setup tasks have completed, the function returns 1.0.
	 * 
	 * @return the progress of the job's setup-tasks.
	 * @throws IOException
	 */
	public float setupProgress() throws IOException, InterruptedException {
		ensureState(JobState.RUNNING);
		ensureFreshStatus();
		return status.getSetupProgress();
	}

	/**
	 * Default to the new APIs unless they are explicitly set or the old mapper
	 * or reduce attributes are used.
	 * 
	 * @throws IOException
	 *             if the configuration is inconsistant
	 */
	protected void setUseNewAPI() throws IOException {
		int numReduces = conf.getNumReduceTasks();
		String oldMapperClass = "mapred.mapper.class";
		String oldReduceClass = "mapred.reducer.class";
		conf.setBooleanIfUnset("mapred.mapper.new-api",
				conf.get(oldMapperClass) == null);
		if (conf.getUseNewMapper()) {
			String mode = "new map API";
			ensureNotSet("mapred.input.format.class", mode);
			ensureNotSet(oldMapperClass, mode);
			if (numReduces != 0) {
				ensureNotSet("mapred.partitioner.class", mode);
			} else {
				ensureNotSet("mapred.output.format.class", mode);
			}
		} else {
			String mode = "map compatability";
			ensureNotSet(INPUT_FORMAT_CLASS_ATTR, mode);
			ensureNotSet(MAP_CLASS_ATTR, mode);
			if (numReduces != 0) {
				ensureNotSet(PARTITIONER_CLASS_ATTR, mode);
			} else {
				ensureNotSet(OUTPUT_FORMAT_CLASS_ATTR, mode);
			}
		}
		if (numReduces != 0) {
			conf.setBooleanIfUnset("mapred.reducer.new-api",
					conf.get(oldReduceClass) == null);
			if (conf.getUseNewReducer()) {
				String mode = "new reduce API";
				ensureNotSet("mapred.output.format.class", mode);
				ensureNotSet(oldReduceClass, mode);
			} else {
				String mode = "reduce compatability";
				ensureNotSet(OUTPUT_FORMAT_CLASS_ATTR, mode);
				ensureNotSet(REDUCE_CLASS_ATTR, mode);
			}
		}
	}

	/**
	 * Set the reported username for this job.
	 * 
	 * @param user
	 *            the username for this job.
	 */
	public void setUser(String user) {
		ensureState(JobState.DEFINE);
		conf.setUser(user);
	}

	/**
	 * Set the current working directory for the default file system.
	 * 
	 * @param dir
	 *            the new current working directory.
	 * @throws IllegalStateException
	 *             if the job is submitted
	 */
	public void setWorkingDirectory(Path dir) throws IOException {
		ensureState(JobState.DEFINE);
		conf.setWorkingDirectory(dir);
	}

	/**
	 * @return true if the profile parameters indicate that this is using hprof,
	 *         which generates profile files in a particular location that we
	 *         can retrieve to the client.
	 */
	private boolean shouldDownloadProfile() {
		// Check the argument string that was used to initialize profiling.
		// If this indicates hprof and file-based output, then we're ok to
		// download.
		String profileParams = getProfileParams();

		if (null == profileParams) {
			return false;
		}

		// Split this on whitespace.
		String[] parts = profileParams.split("[ \\t]+");

		// If any of these indicate hprof, and the use of output files, return
		// true.
		boolean hprofFound = false;
		boolean fileFound = false;
		for (String p : parts) {
			if (p.startsWith("-agentlib:hprof") || p.startsWith("-Xrunhprof")) {
				hprofFound = true;

				// This contains a number of comma-delimited components, one of
				// which
				// may specify the file to write to. Make sure this is present
				// and
				// not empty.
				String[] subparts = p.split(",");
				for (String sub : subparts) {
					if (sub.startsWith("file=")
							&& sub.length() != "file=".length()) {
						fileFound = true;
					}
				}
			}
		}

		return hprofFound && fileFound;
	}

	/**
	 * Submit the job to the cluster and return immediately.
	 * 
	 * @throws IOException
	 */
	public void submit() throws IOException, InterruptedException,
			ClassNotFoundException {
		ensureState(JobState.DEFINE);
		setUseNewAPI();
		status = new JobSubmitter(cluster.getFileSystem(), cluster.getClient())
				.submitJobInternal(this, cluster);
		state = JobState.RUNNING;
	}

	/**
	 * Dump stats to screen.
	 */
	@Override
	public String toString() {
		ensureState(JobState.RUNNING);
		try {
			updateStatus();
		} catch (IOException e) {
		} catch (InterruptedException ie) {
		}
		StringBuffer sb = new StringBuffer();
		sb.append("Job: ").append(status.getJobID()).append("\n");
		sb.append("Job File: ").append(status.getJobFile()).append("\n");
		sb.append("Job Tracking URL : ").append(status.getTrackingUrl());
		sb.append("\n");
		sb.append("map() completion: ");
		sb.append(status.getMapProgress()).append("\n");
		sb.append("reduce() completion: ");
		sb.append(status.getReduceProgress()).append("\n");
		sb.append("Job state: ");
		sb.append(status.getState()).append("\n");
		sb.append("history URL: ");
		sb.append(status.getHistoryFile()).append("\n");
		sb.append("retired: ").append(status.isRetired());
		return sb.toString();
	}

	/**
	 * Some methods need to update status immediately. So, refresh immediately
	 * 
	 * @throws IOException
	 */
	synchronized void updateStatus() throws IOException, InterruptedException {
		this.status = cluster.getClient().getJobStatus(status.getJobID());
		if (this.status == null) {
			throw new IOException("Job status not available ");
		}
		this.statustime = System.currentTimeMillis();
	}
	
	/**
	 * Submit the job to the cluster and wait for it to finish.
	 * 
	 * @param verbose
	 *            print the progress to the user
	 * @return true if the job succeeded
	 * @throws IOException
	 *             thrown if the communication with the <code>JobTracker</code>
	 *             is lost
	 */
	public boolean waitForCompletion(boolean verbose) throws IOException,
			InterruptedException, ClassNotFoundException {
		if (state == JobState.DEFINE) {
			submit();
		}
		if (verbose) {
			monitorAndPrintJob();
		} else {
			// get the completion poll interval from the client.
			int completionPollIntervalMillis = Job
					.getCompletionPollInterval(cluster.getConf());
			while (!isComplete()) {
				try {
					Thread.sleep(completionPollIntervalMillis);
				} catch (InterruptedException ie) {
				}
			}
		}
		return isSuccessful();
	}
}
